# Load packages
pacman::p_load(parallel)
# Knit settings
knitr::opts_chunk$set(echo=TRUE, message=FALSE, comment=NA, warning=FALSE, tidy=TRUE, results="hold", cache=FALSE)
# Denote cores (um... actually, they are called threads)
noquote("Number of threads")
numCores <- detectCores()
numCores[1] Number of threads
[1] 8
To use the
# lapply with one input
noquote("lapply with one input")
lapply(1:3, paste)
# lapply shorthand
noquote("lapply shorthand")
lapply(1:3, function(x) paste("task", x))
# lapply longhand
noquote("lapply longhand")
## save as function
task_func <- function(x) {
paste("task", x)
}
## apply function in lapply
lapply(1:3, task_func)[1] lapply with one input
[[1]]
[1] "1"
[[2]]
[1] "2"
[[3]]
[1] "3"
[1] lapply shorthand
[[1]]
[1] "task 1"
[[2]]
[1] "task 2"
[[3]]
[1] "task 3"
[1] lapply longhand
[[1]]
[1] "task 1"
[[2]]
[1] "task 2"
[[3]]
[1] "task 3"
Let’s let it sleep on one thread for 24 seconds. The time reflects the total task time, plus any overhead a computer naturally generates.
# Sleep for 24 seconds
save1 <- system.time(Sys.sleep(24))
# Display time
noquote("Time in seconds")
save1[1] Time in seconds
user system elapsed
0.00 0.00 24.35
Let’s let it sleep for 24 seconds, but divide the 24 seconds equally on each of the 8 threads so that each thread sleeps for 3 seconds.
# Define cluster
cl <- parallel::makeCluster(numCores)
# Sleep for 24 seconds
save2 <- system.time(
parallel::parLapply( cl,
rep(24/numCores, numCores), # 3 sec
Sys.sleep
)
)
# Stop cluster
parallel::stopCluster(cl)
# Display time
noquote("Time in seconds")
save2[1] Time in seconds
user system elapsed
0.00 0.00 3.04
Previously, the 24 seconds were divided evenly on 8 threads.
Here, we will have unequal tasks that take 10 sec, 5 sec, and six 1.5 sec tasks.
By default, parallel processing will take at least as long as the longest task (10 sec).
Using an if statement, we can break down the longest task into 7 parts and distribute it on all threads except the second longest task (5 sec) to bring down the time to at least as long as the second task.
# Define cluster
cl <- parallel::makeCluster(numCores)
# Sleep for 24 seconds
save3 <- system.time(
parallel::parLapply( cl,
c( 10, # 10 sec, 1x
5, # 5 sec, 1x
rep(1.5, 6) # 1.5 sec, 6x
),
Sys.sleep
)
)
# Stop cluster
parallel::stopCluster(cl)
# Display time
noquote("Time in seconds")
save3[1] Time in seconds
user system elapsed
0.00 0.00 10.17
# Create a function that distributes the longest task
# on the shortest 6 threads
if_func <- function(x) {
# Longest task
longCL <- 10
# Longest task on other 7 threads
secondCL <- 5
# Shortest 6 threads
otherCL <- 1.5
# Long task
if (x==longCL) {
Sys.sleep(longCL/7) # Cut by 7 and distribute the rest on 6 shortest threads
}
# Second longest task
else if (x==secondCL) {
Sys.sleep(secondCL) # Keep this the same length
}
# Shortest 6 threads
else {
# Distribute longest task evenly on the other threads
Sys.sleep(x + longCL/7) # add 1.4 sec from longest task
}
}
# Define cluster
cl <- parallel::makeCluster(numCores)
# Sleep for 24 seconds
save4 <- system.time(
parallel::parLapply( cl,
c( 10, # 10 sec, 1x
5, # 5 sec, 1x
rep(1.5, 6) # 1.5 sec, 6x
),
if_func
)
)
# Stop cluster
parallel::stopCluster(cl)
# Display time
noquote("Time in seconds")
save4[1] Time in seconds
user system elapsed
0.01 0.00 5.13
We had a task that took 24 seconds.
Doing nothing, it took that time plus the normal overhead for a total of 24.35 seconds.
Dividing the task along 8 threads evenly took a total of 3.04 seconds.
For unequal tasks (10s, 5s, six 1.5s), simply using the
For unequal tasks (10s, 5s, six 1.5s), using a customized